#!/usr/bin/env python
# s3funnel - Multithreaded tool for performing operations on Amazon's S3
# Copyright (c) 2008 Andrey Petrov
#
# This module is part of s3funnel and is released under
# the MIT license: http://www.opensource.org/licenses/mit-license.php

"""
s3funnel is a multithreaded tool for performing operations on Amazon's S3.
"""

"""
TODO:
* Use callback to do an interactive progress bar
"""

import os
import time
import logging
import signal
import threading
import socket

import workerpool
import boto

from glob import glob
from optparse import OptionParser

log = logging.getLogger(__name__)
event_stop = threading.Event()

class S3ToolBox(object):
    def __init__(self, aws_key, aws_secret_key, bucket):
        self.conn = boto.connect_s3(aws_key, aws_secret_key)
        self.bucket = self.conn.get_bucket(bucket)

# Multithreaded jobs

class GetJob(workerpool.Job):
    def __init__(self, key, config={}):
        self.key = key
        self.retries = config.get('retry', 5)

    def run(self, toolbox):
        k = toolbox.bucket.new_key(self.key)
        wait = 1
        for i in xrange(self.retries):
            try:
                # Note: This creates a file, even if the download fails
                k.get_contents_to_filename(self.key)
                log.info("Got: %s" % self.key)
                return
            except boto.exception.BotoServerError, e:
                log.error("Failed to get: %s" % self.key)
                os.unlink(self.key) # Remove file since download failed
                return
            except (httplib.IncompleteRead, socket.error, boto.exception.BotoClientError), e:
                log.warn("Caught exception: %s. Retrying..." % e.__class__.__name__)
                wait = (2 ** wait) / 2.0 # Exponential backoff
                time.sleep(wait)

class PutJob(workerpool.Job):
    def __init__(self, path, config={}):
        self.key = os.path.basename(path)
        self.path = path
        self.retries = config.get('retry', 5)
        self.acl = config.get('acl')
        if self.acl not in ['public-read', 'private']:
            self.acl = 'private'

    def run(self, toolbox):
        k = toolbox.bucket.new_key(self.key)
        wait = 1
        headers = {'x-amz-acl': self.acl}

        for i in xrange(self.retries):
            try:
                # Note: This creates a file, even if the download fails
                k.set_contents_from_filename(self.path, headers)
                log.info("Sent: %s" % self.key)
                return
            except boto.exception.BotoServerError, e:
                log.error("Failed to put: %s" % self.key)
                os.unlink(self.key) # Remove file since download failed
                return
            except (httplib.IncompleteRead, socket.error, boto.exception.BotoClientError), e:
                log.warn("Caught exception: %s. Retrying..." % e.__class__.__name__)
                wait = (2 ** wait) / 2.0 # Exponential backoff
                time.sleep(wait)


# Singlethreaded methods

def perform_list(toolbox, start_key=None):
    for i in toolbox.bucket:
        print i.name

def main():
    # Parse the command line...
    usage="%prog BUCKET OPERATION [OPTIONS] [FILE]...\n" + __doc__
    parser = OptionParser(usage)
    parser.add_option("-a", "--aws_key", dest="aws_key", type="string", 
                        help="Overrides AWS_ACCESS_KEY_ID environment variable")
    parser.add_option("-s", "--aws_secret_key", dest="aws_secret_key", type="string",
                        help="Overrides AWS_SECRET_ACCESS_KEY environment variable")
    parser.add_option("-t", "--threads", dest="numthreads", default=1, type="int", metavar="N",
                        help="Number of threads to use (default: 5)")
    parser.add_option("--start_key", dest="start_key", type="string", default=None,
                        help="(`list` only) Start key for list operation")
    parser.add_option("--acl", dest="acl", type="string", default="public-read",
                        help="(`put` only) Set the ACL permission for each file (default: public-read)")
    parser.add_option("-i", "--input", dest="input", type="string", metavar="FILE",
                        help="Read one file per line from a FILE manifest.")
    parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=None,
                        help="Enable verbose output")
    # TODO:
    #   (-T, --timeout) socket timeout
    #   (-i, --input) manifest input file
    
    options, args = parser.parse_args()

    # Check input
    aws_key = os.environ.get("AWS_ACCESS_KEY_ID")
    aws_secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY")

    ## AWS
    if options.aws_key:
        aws_key = options.aws_key
    if options.aws_secret_key:
        aws_secret_key = options.aws_secret_key
    if None in [aws_key, aws_secret_key]:
        parser.error("Missing required arguments `aws_key` or `aws_secret_key`")

    ## Threads
    if options.numthreads < 1:
        parser.error("`theads` must be at least 1")

    ## Arguments
    if len(args) < 1:
        parser.error("BUCKET not specified")
    bucket = args[0]
    if len(args) < 2:
        parser.error("OPERATION not specified")
    operation = args[1].lower()
    if operation not in ["get", "put", "list", "delete"]:
        parser.error("OPERATION must be one of: get, put, list, or delete")

    # Setup factories
    def toolbox_factory():
        return S3ToolBox(aws_key, aws_secret_key, bucket)
    def worker_factory(job_queue):
        return workerpool.EquippedWorker(job_queue, toolbox_factory)

    # Setup logging
    # TODO: Rewrite this to use different handlers
    level = logging.INFO
    if options.verbose:
        level = logging.DEBUG
    log.addHandler(logging.StreamHandler())
    log.level = level

    # Setup operation mapping
    job_map = {'get': GetJob,
               'put': PutJob}
    method_map = {'list': perform_list}

    if operation in method_map:
        return method_map[operation](toolbox_factory())
    
    if operation not in job_map:
        parser.error("Operation `%s' not implemented yet." % operation)

    # Get data source
    input_src = None
    if options.input:
        # Get source from manifest or stdin (via -i flag)
        if options.input == '-':
            input_src = "stdin"
            options.input = sys.stdin
        try:
            data = open(glob(options.input)[0])
        except IOError, IndexError:
            log.error("%s: File not found" % options.input)
            return
        input_src = "`%s'" % options.input
    elif len(args) < 3:
        # Get source from stdin
        input_src = "stdin"
        data = sys.stdin
    else:
        if operation == 'put':
            # Get source from glob-expanded arguments
            data = []
            for arg in args[2:]:
                found = glob(arg)
                if not found:
                    log.error("%s: No such file." % arg)
                    continue
                data += found
        else:
            data = args[2:]
        input_src = "arguments: %s" % ', '.join(data)

    # Setup operation configuration
    config = {'acl': options.acl,
              'start_key': options.start_key,
              }

    # Fire up the thread pool
    log.info("Starting pool with %d threads..." % options.numthreads)
    try:
        pool = workerpool.WorkerPool(options.numthreads, maxjobs=options.numthreads*2, worker_factory=worker_factory)
    except Exception, e:
        log.critical("Failed starting workers: <%s> %s: %s" % (e.__class__.__name__, e.status, e.reason))
        return

    # Setup interrupt handling
    def shutdown(signum, stack):
        log.warn("Interrupted, shutting down...")
        event_stop.set()
    signal.signal(signal.SIGINT, shutdown)

    # Start feeding jobs into the workers
    log.info("Using files from %s" % input_src )
    Job = job_map[operation]
    try:
        for item in data:
            if event_stop.isSet(): break
            j = Job(item.strip(), config)
            pool.put(j)
    except IOError:
        log.warn("Aborted.")

    pool.shutdown()

if __name__ == "__main__":
    main()
